CheckpointStore 接口 API 参考文档 #
本文档中引用的文件
目录 #
简介 #
CheckpointStore 接口是 langgraphgo 中负责图执行状态持久化的核心组件。它提供了统一的抽象层,允许开发者选择不同的存储后端(内存、文件系统、数据库等)来保存和恢复图的执行状态。该接口确保了图执行过程中的状态可恢复性、持久性和一致性。
检查点存储在分布式系统和长时间运行的任务中尤为重要,它使得系统能够在崩溃后从上次的状态继续执行,避免重复工作并保持数据完整性。
接口定义 #
CheckpointStore 接口定义了五个核心方法,每个方法都有明确的职责和语义:
classDiagram
class CheckpointStore {
<<interface>>
+Save(ctx Context, checkpoint *Checkpoint) error
+Load(ctx Context, checkpointID string) (*Checkpoint, error)
+List(ctx Context, executionID string) ([]*Checkpoint, error)
+Delete(ctx Context, checkpointID string) error
+Clear(ctx Context, executionID string) error
}
class Checkpoint {
+string ID
+string NodeName
+interface State
+map[string]interface Metadata
+time.Time Timestamp
+int Version
}
class MemoryCheckpointStore {
-map[string]*Checkpoint checkpoints
-sync.RWMutex mutex
+Save(ctx Context, checkpoint *Checkpoint) error
+Load(ctx Context, checkpointID string) (*Checkpoint, error)
+List(ctx Context, executionID string) ([]*Checkpoint, error)
+Delete(ctx Context, checkpointID string) error
+Clear(ctx Context, executionID string) error
}
class PostgresCheckpointStore {
-DBPool pool
-string tableName
+Save(ctx Context, checkpoint *Checkpoint) error
+Load(ctx Context, checkpointID string) (*Checkpoint, error)
+List(ctx Context, executionID string) ([]*Checkpoint, error)
+Delete(ctx Context, checkpointID string) error
+Clear(ctx Context, executionID string) error
+InitSchema(ctx Context) error
+Close()
}
class RedisCheckpointStore {
-redis.Client client
-string prefix
-time.Duration ttl
+Save(ctx Context, checkpoint *Checkpoint) error
+Load(ctx Context, checkpointID string) (*Checkpoint, error)
+List(ctx Context, executionID string) ([]*Checkpoint, error)
+Delete(ctx Context, checkpointID string) error
+Clear(ctx Context, executionID string) error
}
class SqliteCheckpointStore {
-sql.DB db
-string tableName
+Save(ctx Context, checkpoint *Checkpoint) error
+Load(ctx Context, checkpointID string) (*Checkpoint, error)
+List(ctx Context, executionID string) ([]*Checkpoint, error)
+Delete(ctx Context, checkpointID string) error
+Clear(ctx Context, executionID string) error
+InitSchema(ctx Context) error
+Close() error
}
CheckpointStore <|.. MemoryCheckpointStore
CheckpointStore <|.. PostgresCheckpointStore
CheckpointStore <|.. RedisCheckpointStore
CheckpointStore <|.. SqliteCheckpointStore
CheckpointStore --> Checkpoint : manages
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L22-L25)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L13-L17)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L13-L16)
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
核心方法详解 #
Save 方法 #
函数签名:
Save(ctx context.Context, checkpoint *Checkpoint) error
功能描述: 将指定的检查点保存到存储后端。如果检查点已存在,则更新现有记录。
参数说明:
ctx: 上下文对象,用于控制操作的生命周期和超时checkpoint: 要保存的检查点对象,包含完整的状态信息
返回值:
error: 操作成功返回 nil,失败返回具体错误
错误类型:
- 数据库连接失败
- 序列化/反序列化错误
- 存储空间不足
- 权限不足
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L24-L24)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L91-L135)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L57-L84)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L77-L121)
Load 方法 #
函数签名:
Load(ctx context.Context, checkpointID string) (*Checkpoint, error)
功能描述: 根据检查点 ID 从存储后端检索特定的检查点。
参数说明:
ctx: 上下文对象checkpointID: 要加载的检查点唯一标识符
返回值:
*Checkpoint: 找到的检查点对象error: 操作结果,未找到时返回fmt.Errorf("checkpoint not found: %s", checkpointID)
错误类型:
- 检查点不存在
- 数据库查询失败
- 反序列化错误
- 网络连接问题
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L27-L27)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L137-L176)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L86-L103)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L123-L162)
List 方法 #
函数签名:
List(ctx context.Context, executionID string) ([]*Checkpoint, error)
功能描述: 获取指定执行的所有检查点,按时间戳升序排列。
参数说明:
ctx: 上下文对象executionID: 执行实例的唯一标识符
返回值:
[]*Checkpoint: 包含所有相关检查点的切片error: 操作结果
错误类型:
- 查询执行失败
- 数据格式不正确
- 存储后端不可用
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L30-L30)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L178-L229)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L105-L155)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L164-L215)
Delete 方法 #
函数签名:
Delete(ctx context.Context, checkpointID string) error
功能描述: 删除指定的检查点。
参数说明:
ctx: 上下文对象checkpointID: 要删除的检查点标识符
返回值:
error: 操作结果,成功返回 nil
错误类型:
- 检查点不存在
- 删除权限不足
- 存储后端只读
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L33-L33)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L231-L239)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L157-L181)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L217-L225)
Clear 方法 #
函数签名:
Clear(ctx context.Context, executionID string) error
功能描述: 删除指定执行的所有检查点。
参数说明:
ctx: 上下文对象executionID: 执行实例标识符
返回值:
error: 操作结果
错误类型:
- 清理操作失败
- 存储后端异常
- 权限问题
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L36-L36)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L241-L249)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L183-L211)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L227-L236)
检查点数据结构 #
检查点(Checkpoint)是存储图执行状态的核心数据结构,包含了执行过程中的关键信息:
classDiagram
class Checkpoint {
+string ID
+string NodeName
+interface State
+map[string]interface Metadata
+time.Time Timestamp
+int Version
}
class MetadataFields {
+string execution_id
+string event
+string source
+string updated_by
+any custom_fields
}
Checkpoint --> MetadataFields : contains
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L12-L20)
字段说明 #
| 字段名 | 类型 | 描述 | 必需 |
|---|---|---|---|
| ID | string | 检查点的唯一标识符 | 是 |
| NodeName | string | 触发检查点的节点名称 | 是 |
| State | interface{} | 图的当前状态数据 | 是 |
| Metadata | map[string]interface{} | 元数据信息,包含执行上下文 | 否 |
| Timestamp | time.Time | 检查点创建的时间戳 | 是 |
| Version | int | 检查点版本号,用于冲突解决 | 是 |
元数据字段 #
| 字段名 | 类型 | 描述 | 示例值 |
|---|---|---|---|
| execution_id | string | 执行实例标识符 | “exec_123456789” |
| event | string | 触发事件类型 | “step”, “resume”, “manual” |
| source | string | 操作来源 | “auto_save”, “update_state” |
| updated_by | string | 更新节点名称 | “process_node” |
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L12-L20)
实现示例 #
内存存储实现 #
内存存储是最简单的实现,适用于测试和临时状态保存:
sequenceDiagram
participant Client as 客户端
participant MemStore as MemoryCheckpointStore
participant Mutex as 互斥锁
Client->>MemStore : Save(checkpoint)
MemStore->>Mutex : Lock()
MemStore->>MemStore : 存储到内存映射
MemStore->>Mutex : Unlock()
MemStore-->>Client : 返回错误
Client->>MemStore : Load(checkpointID)
MemStore->>Mutex : RLock()
MemStore->>MemStore : 从内存映射查找
MemStore->>Mutex : RUnlock()
MemStore-->>Client : 返回检查点或错误
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L40-L111)
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L40-L111)
PostgreSQL 存储实现 #
PostgreSQL 实现支持高并发和持久化存储:
sequenceDiagram
participant Client as 客户端
participant PGStore as PostgresCheckpointStore
participant Pool as 连接池
participant DB as PostgreSQL
Client->>PGStore : Save(checkpoint)
PGStore->>PGStore : JSON序列化状态和元数据
PGStore->>Pool : Exec(query, params)
Pool->>DB : INSERT/UPDATE 语句
DB-->>Pool : 执行结果
Pool-->>PGStore : 响应
PGStore-->>Client : 返回错误
Client->>PGStore : Load(checkpointID)
PGStore->>Pool : QueryRow(query, id)
Pool->>DB : SELECT 语句
DB-->>Pool : 行数据
Pool-->>PGStore : 结果集
PGStore->>PGStore : JSON反序列化
PGStore-->>Client : 返回检查点
图表来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L91-L135)
节来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L22-L250)
Redis 存储实现 #
Redis 实现提供高性能的缓存式存储:
sequenceDiagram
participant Client as 客户端
participant RedisStore as RedisCheckpointStore
participant Redis as Redis服务器
Client->>RedisStore : Save(checkpoint)
RedisStore->>RedisStore : JSON序列化
RedisStore->>RedisStore : 构建管道命令
Note over RedisStore : SET + SADD + Expire
RedisStore->>Redis : Pipeline执行
Redis-->>RedisStore : 批量响应
RedisStore-->>Client : 返回结果
Client->>RedisStore : List(executionID)
RedisStore->>Redis : SMembers(execKey)
Redis-->>RedisStore : 检查点ID列表
RedisStore->>Redis : MGet(checkpointKeys)
Redis-->>RedisStore : 检查点数据
RedisStore->>RedisStore : 并行反序列化
RedisStore-->>Client : 返回检查点列表
图表来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L57-L155)
节来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L13-L212)
SQLite 存储实现 #
SQLite 实现提供轻量级的本地存储:
节来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L13-L236)
线程安全与事务一致性 #
线程安全保证 #
CheckpointStore 接口的实现必须保证线程安全,特别是在多并发访问的情况下:
flowchart TD
A[并发请求] --> B{检查点操作类型}
B --> |读操作| C[读锁保护]
B --> |写操作| D[写锁保护]
C --> E[共享访问]
D --> F[独占访问]
E --> G[同时多个读取]
F --> H[串行写入]
G --> I[返回结果]
H --> J[更新存储]
J --> I
style C fill:#e1f5fe
style D fill:#fff3e0
style G fill:#e8f5e8
style H fill:#ffebee
实现策略 #
-
内存同步原语:
- 使用
sync.RWMutex提供读写锁 - 读操作共享锁,写操作独占锁
- 使用
-
数据库事务:
- 使用数据库连接池管理事务
- 支持 ACID 特性
-
分布式锁:
- Redis 实现使用原子操作
- 数据库实现使用行级锁
事务一致性 #
stateDiagram-v2
[*] --> 开始事务
开始事务 --> 验证数据
验证数据 --> 执行操作
执行操作 --> 检查结果
检查结果 --> 提交事务 : 成功
检查结果 --> 回滚事务 : 失败
提交事务 --> [*]
回滚事务 --> [*]
执行操作 --> 错误处理
错误处理 --> 回滚事务
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L40-L111)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L91-L135)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L57-L84)
自定义存储后端开发指南 #
开发步骤 #
-
实现接口:
type CustomCheckpointStore struct { // 存储后端特定配置 } func (s *CustomCheckpointStore) Save(ctx context.Context, checkpoint *Checkpoint) error { // 实现保存逻辑 } // 实现其他方法... -
错误处理:
- 统一错误类型转换
- 提供有意义的错误消息
- 支持重试机制
-
性能优化:
- 连接池管理
- 缓存策略
- 批量操作
集成方式 #
classDiagram
class CustomStore {
+Save(ctx Context, checkpoint *Checkpoint) error
+Load(ctx Context, checkpointID string) (*Checkpoint, error)
+List(ctx Context Context, executionID string) ([]*Checkpoint, error)
+Delete(ctx Context, checkpointID string) error
+Clear(ctx Context, executionID string) error
}
class CheckpointableRunnable {
-CheckpointStore store
-CheckpointConfig config
+Invoke(ctx Context, initialState interface) (interface, error)
+SaveCheckpoint(ctx Context, nodeName string, state interface) error
+LoadCheckpoint(ctx Context, checkpointID string) (*Checkpoint, error)
+ListCheckpoints(ctx Context) ([]*Checkpoint, error)
+ResumeFromCheckpoint(ctx Context, checkpointID string) (interface, error)
+ClearCheckpoints(ctx Context) error
}
class GraphEngine {
-CheckpointableRunnable runnable
+Execute(config Config) (interface, error)
+Resume(config Config) (interface, error)
}
CustomStore ..|> CheckpointStore
CheckpointableRunnable --> CheckpointStore
GraphEngine --> CheckpointableRunnable
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L214-L295)
最佳实践 #
-
错误处理:
func (s *CustomStore) Save(ctx context.Context, checkpoint *Checkpoint) error { // 验证输入 if checkpoint == nil { return fmt.Errorf("checkpoint cannot be nil") } // 执行操作 err := s.performSave(ctx, checkpoint) if err != nil { return fmt.Errorf("failed to save checkpoint %s: %w", checkpoint.ID, err) } return nil } -
资源管理:
func (s *CustomStore) Close() error { // 清理资源 if s.connection != nil { return s.connection.Close() } return nil } -
配置验证:
func NewCustomStore(config StoreConfig) (*CustomCheckpointStore, error) { if err := config.Validate(); err != nil { return nil, fmt.Errorf("invalid configuration: %w", err) } return &CustomCheckpointStore{config: config}, nil }
节来源
- [examples/durable_execution/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/durable_execution/main.go#L16-L89)
性能优化建议 #
存储层优化 #
-
批量操作:
- 使用数据库事务批量插入
- Redis 管道操作
- 文件系统批量写入
-
索引优化:
- 在
execution_id字段上建立索引 - 在
timestamp字段上建立排序索引 - 使用复合索引提高查询效率
- 在
-
缓存策略:
- LRU 缓存最近访问的检查点
- 预加载常用检查点
- 分层缓存架构
网络优化 #
flowchart LR
A[客户端请求] --> B[连接池]
B --> C[负载均衡器]
C --> D[存储后端集群]
D --> E[主节点]
D --> F[从节点1]
D --> G[从节点2]
E --> H[持久化存储]
F --> I[读缓存]
G --> J[读缓存]
style B fill:#e3f2fd
style C fill:#f3e5f5
style D fill:#e8f5e8
style H fill:#ffebee
style I fill:#e8f5e8
style J fill:#e8f5e8
内存优化 #
-
对象池:
- 复用 Checkpoint 对象
- 减少垃圾回收压力
- 控制内存分配
-
压缩存储:
- 使用 gzip 压缩状态数据
- 选择合适的数据序列化格式
- 实现增量存储
监控指标 #
| 指标类型 | 指标名称 | 描述 | 目标值 |
|---|---|---|---|
| 延迟 | SaveLatency | 保存检查点的平均延迟 | < 10ms |
| 延迟 | LoadLatency | 加载检查点的平均延迟 | < 5ms |
| 吞吐量 | Throughput | 每秒处理的检查点数量 | > 1000 ops/s |
| 错误率 | ErrorRate | 操作失败率 | < 0.1% |
| 资源使用 | MemoryUsage | 内存使用量 | < 512MB |
| 资源使用 | CPUUsage | CPU 使用率 | < 30% |
典型使用场景 #
场景一:自动检查点保存 #
sequenceDiagram
participant Graph as 图执行器
participant Listener as 检查点监听器
participant Store as 存储后端
participant Timer as 定时器
Graph->>Listener : 执行节点完成
Listener->>Listener : 创建检查点
Listener->>Store : 异步保存
Store-->>Listener : 确认保存
Timer->>Listener : 定时触发
Listener->>Listener : 检查是否需要保存
Listener->>Store : 批量保存
Store-->>Listener : 批量确认
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L298-L330)
场景二:手动检查点管理 #
flowchart TD
A[开始执行] --> B[定期保存检查点]
B --> C{用户请求保存?}
C --> |是| D[手动保存检查点]
C --> |否| E[继续执行]
D --> F[更新元数据]
F --> E
E --> G{执行完成?}
G --> |否| B
G --> |是| H[清理检查点]
H --> I[结束]
场景三:故障恢复 #
flowchart TD
A[系统启动] --> B[查找检查点]
B --> C{找到检查点?}
C --> |是| D[加载最新检查点]
C --> |否| E[从初始状态开始]
D --> F[验证检查点完整性]
F --> G{验证通过?}
G --> |是| H[从检查点恢复]
G --> |否| I[尝试下一个检查点]
I --> F
H --> J[继续执行]
E --> J
J --> K[执行完成]
节来源
- [examples/checkpointing/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/main.go#L1-119)
- [examples/durable_execution/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/durable_execution/main.go#L150-242)
故障排除 #
常见问题及解决方案 #
| 问题类型 | 症状 | 可能原因 | 解决方案 |
|---|---|---|---|
| 连接失败 | “connection refused” | 存储服务不可用 | 检查服务状态,重启连接 |
| 序列化错误 | “json marshal failed” | 状态数据包含不可序列化类型 | 使用兼容的数据类型 |
| 权限错误 | “permission denied” | 访问权限不足 | 检查用户权限设置 |
| 磁盘空间不足 | “no space left” | 存储空间耗尽 | 清理旧检查点,扩展存储 |
| 数据损坏 | “corruption detected” | 存储介质故障 | 从备份恢复,更换硬件 |
调试技巧 #
-
启用详细日志:
logger := log.New(os.Stdout, "CHECKPOINT: ", log.LstdFlags|log.Lshortfile) -
监控关键指标:
metrics := prometheus.NewCounterVec( prometheus.CounterOpts{Name: "checkpoint_operations"}, []string{"operation", "status"}, ) -
健康检查:
func (s *Store) HealthCheck(ctx context.Context) error { // 测试基本操作 return s.Save(ctx, &Checkpoint{ID: "health_check"}) }
性能诊断 #
flowchart TD
A[性能问题] --> B[收集指标]
B --> C[分析瓶颈]
C --> D{瓶颈类型}
D --> |CPU| E[优化算法]
D --> |内存| F[减少分配]
D --> |I/O| G[使用缓存]
D --> |网络| H[优化连接]
E --> I[重新测试]
F --> I
G --> I
H --> I
I --> J{性能改善?}
J --> |是| K[部署优化]
J --> |否| L[深入分析]
L --> C
备份与恢复 #
-
定期备份:
- 设置自动化备份任务
- 验证备份完整性
- 测试恢复流程
-
灾难恢复:
- 制定恢复计划
- 维护备用存储
- 培训运维团队
节来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L64-L84)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L50-L70)
结论 #
CheckpointStore 接口为 langgraphgo 提供了强大而灵活的状态持久化能力。通过统一的接口设计,开发者可以轻松切换不同的存储后端,满足不同场景的需求。正确的实现和优化能够显著提升系统的可靠性和性能。
在实际应用中,建议根据具体需求选择合适的存储方案,并实施适当的监控和维护策略,以确保系统的稳定运行。